Make the dependency queue entirely generic
authorAlex Crichton <alex@alexcrichton.com>
Tue, 29 Jul 2014 04:13:30 +0000 (21:13 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Sat, 2 Aug 2014 07:16:19 +0000 (00:16 -0700)
This paves the way for future intra-package parallelism as opposed to just
inter-package parallelism.

src/cargo/ops/cargo_rustc/job_queue.rs
src/cargo/util/dependency_queue.rs
src/cargo/util/mod.rs

index 4eb5a516e03e3fa88b452318937b87f43705667d..1c09b4dcb14a5bc1ad799134a471d68b4d490a74 100644 (file)
@@ -4,20 +4,20 @@ use term::color::YELLOW;
 
 use core::{Package, PackageId, Resolve};
 use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness};
-use util::{CargoResult, profile};
+use util::{CargoResult, Dependency, profile};
 
 use super::job::Job;
 
 pub struct JobQueue<'a, 'b> {
     pool: TaskPool,
-    queue: DependencyQueue<'a, (&'a Package, (Job, Job))>,
+    queue: DependencyQueue<&'a PackageId, (&'a Package, (Job, Job))>,
     tx: Sender<Message>,
     rx: Receiver<Message>,
     active: HashMap<&'a PackageId, uint>,
     config: &'b mut Config<'b>,
 }
 
-type Message = (PackageId, Freshness, CargoResult<Vec<Job>>);
+type Message = (PackageId, CargoResult<Vec<Job>>);
 
 impl<'a, 'b> JobQueue<'a, 'b> {
     pub fn new(config: &'b mut Config<'b>,
@@ -27,12 +27,10 @@ impl<'a, 'b> JobQueue<'a, 'b> {
         let (tx, rx) = channel();
         let mut queue = DependencyQueue::new();
         for &(pkg, _, _) in jobs.iter() {
-            queue.register(pkg);
+            queue.register(pkg.get_package_id());
         }
         for (pkg, fresh, job) in jobs.move_iter() {
-            let mut deps = resolve.deps(pkg.get_package_id())
-                                  .move_iter().flat_map(|a| a);
-            queue.enqueue(pkg, deps.collect(), fresh, (pkg, job));
+            queue.enqueue(&resolve, fresh, pkg.get_package_id(), (pkg, job));
         }
 
         JobQueue {
@@ -59,18 +57,18 @@ impl<'a, 'b> JobQueue<'a, 'b> {
         while self.queue.len() > 0 {
             loop {
                 match self.queue.dequeue() {
-                    Some((id, Fresh, (pkg, (_, fresh)))) => {
+                    Some((Fresh, id, (pkg, (_, fresh)))) => {
                         assert!(self.active.insert(id, 1u));
                         try!(self.config.shell().status("Fresh", pkg));
-                        self.tx.send((id.clone(), Fresh, Ok(Vec::new())));
+                        self.tx.send((id.clone(), Ok(Vec::new())));
                         try!(fresh.run());
                     }
-                    Some((id, Dirty, (pkg, (dirty, _)))) => {
+                    Some((Dirty, id, (pkg, (dirty, _)))) => {
                         assert!(self.active.insert(id, 1));
                         try!(self.config.shell().status("Compiling", pkg));
                         let my_tx = self.tx.clone();
                         let id = id.clone();
-                        self.pool.execute(proc() my_tx.send((id, Dirty, dirty.run())));
+                        self.pool.execute(proc() my_tx.send((id, dirty.run())));
                     }
                     None => break,
                 }
@@ -79,7 +77,7 @@ impl<'a, 'b> JobQueue<'a, 'b> {
             // Now that all possible work has been scheduled, wait for a piece
             // of work to finish. If any package fails to build then we stop
             // scheduling work as quickly as possibly.
-            let (id, fresh, result) = self.rx.recv();
+            let (id, result) = self.rx.recv();
             let id = self.active.iter().map(|(&k, _)| k).find(|&k| k == &id)
                          .unwrap();
             *self.active.get_mut(&id) -= 1;
@@ -90,12 +88,12 @@ impl<'a, 'b> JobQueue<'a, 'b> {
                         let my_tx = self.tx.clone();
                         let my_id = id.clone();
                         self.pool.execute(proc() {
-                            my_tx.send((my_id, fresh, job.run()));
+                            my_tx.send((my_id, job.run()));
                         });
                     }
                     if *self.active.get(&id) == 0 {
                         self.active.remove(&id);
-                        self.queue.finish(id, fresh);
+                        self.queue.finish(&id);
                     }
                 }
                 Err(e) => {
@@ -119,3 +117,9 @@ impl<'a, 'b> JobQueue<'a, 'b> {
         Ok(())
     }
 }
+
+impl<'a> Dependency<&'a PackageId, &'a Resolve> for &'a PackageId {
+    fn dependencies(&self, resolve: &&'a Resolve) -> Vec<&'a PackageId> {
+        resolve.deps(*self).move_iter().flat_map(|a| a).collect()
+    }
+}
index c65bbcd647dd4161baf0e261de271bbffa46f3ec..450cd7e61905216ed8fea03ebf860c3ce88c66cc 100644 (file)
@@ -5,32 +5,31 @@
 //! it to figure out when a dependency should be built.
 
 use std::collections::{HashMap, HashSet};
+use std::hash::Hash;
 
-use core::{Package, PackageId};
-
-pub struct DependencyQueue<'a, T> {
-    /// A list of all known packages to build.
+pub struct DependencyQueue<K, V> {
+    /// A list of all known keys to build.
     ///
     /// The value of the hash map is list of dependencies which still need to be
     /// built before the package can be built. Note that the set is dynamically
     /// updated as more dependencies are built.
-    pkgs: HashMap<&'a PackageId, (HashSet<&'a PackageId>, T)>,
+    dep_map: HashMap<K, (HashSet<K>, V)>,
 
     /// A reverse mapping of a package to all packages that depend on that
     /// package.
     ///
     /// This map is statically known and does not get updated throughout the
     /// lifecycle of the DependencyQueue.
-    reverse_dep_map: HashMap<&'a PackageId, HashSet<&'a PackageId>>,
+    reverse_dep_map: HashMap<K, HashSet<K>>,
 
     /// A set of dirty packages.
     ///
     /// Packages may become dirty over time if their dependencies are rebuilt.
-    dirty: HashSet<&'a PackageId>,
+    dirty: HashSet<K>,
 
     /// The packages which are currently being built, waiting for a call to
     /// `finish`.
-    pending: HashSet<&'a PackageId>,
+    pending: HashMap<K, Freshness>,
 }
 
 /// Indication of the freshness of a package.
@@ -43,88 +42,85 @@ pub enum Freshness {
     Dirty,
 }
 
-impl<'a, T> DependencyQueue<'a, T> {
+/// A trait for discovering the dependencies of a piece of data.
+pub trait Dependency<K, C>: Hash + Eq + Clone {
+    fn dependencies(&self, cx: &C) -> Vec<K>;
+}
+
+impl<C, K: Dependency<K, C>, V> DependencyQueue<K, V> {
     /// Creates a new dependency queue with 0 packages.
-    pub fn new() -> DependencyQueue<'a, T> {
+    pub fn new() -> DependencyQueue<K, V> {
         DependencyQueue {
-            pkgs: HashMap::new(),
+            dep_map: HashMap::new(),
             reverse_dep_map: HashMap::new(),
             dirty: HashSet::new(),
-            pending: HashSet::new(),
+            pending: HashMap::new(),
         }
     }
 
     /// Registers a package with this queue.
     ///
     /// Only registered packages will be returned from dequeue().
-    pub fn register(&mut self, pkg: &'a Package) {
-        self.reverse_dep_map.insert(pkg.get_package_id(), HashSet::new());
+    pub fn register(&mut self, step: K) {
+        self.reverse_dep_map.insert(step, HashSet::new());
     }
 
     /// Adds a new package to this dependency queue.
     ///
     /// It is assumed that any dependencies of this package will eventually also
     /// be added to the dependency queue.
-    pub fn enqueue(&mut self, pkg: &'a Package, deps: Vec<&'a PackageId>,
-                   fresh: Freshness, data: T) {
+    pub fn enqueue(&mut self, cx: &C, fresh: Freshness, key: K, value: V) {
         // ignore self-deps
-        if self.pkgs.contains_key(&pkg.get_package_id()) { return }
+        if self.dep_map.contains_key(&key) { return }
 
         if fresh == Dirty {
-            self.dirty.insert(pkg.get_package_id());
+            self.dirty.insert(key.clone());
         }
 
         let mut my_dependencies = HashSet::new();
-        for &dep in deps.iter() {
-            if dep == pkg.get_package_id() { continue }
+        for dep in key.dependencies(cx).move_iter() {
+            if dep == key { continue }
             // skip deps which were filtered out as part of resolve
             if !self.reverse_dep_map.find(&dep).is_some() {
                 continue
             }
 
-            assert!(my_dependencies.insert(dep));
+            assert!(my_dependencies.insert(dep.clone()));
             let rev = self.reverse_dep_map.find_or_insert(dep, HashSet::new());
-            assert!(rev.insert(pkg.get_package_id()));
+            assert!(rev.insert(key.clone()));
         }
-        assert!(self.pkgs.insert(pkg.get_package_id(),
-                                 (my_dependencies, data)));
+        assert!(self.dep_map.insert(key, (my_dependencies, value)));
     }
 
     /// Dequeues a package that is ready to be built.
     ///
     /// A package is ready to be built when it has 0 un-built dependencies. If
     /// `None` is returned then no packages are ready to be built.
-    pub fn dequeue(&mut self) -> Option<(&'a PackageId, Freshness, T)> {
-        let pkg = match self.pkgs.iter()
-                                 .find(|&(_, &(ref deps, _))| deps.len() == 0)
-                                 .map(|(name, _)| *name) {
-            Some(pkg) => pkg,
+    pub fn dequeue(&mut self) -> Option<(Freshness, K, V)> {
+        let key = match self.dep_map.iter()
+                                    .find(|&(_, &(ref deps, _))| deps.len() == 0)
+                                    .map(|(key, _)| key.clone()) {
+            Some(key) => key,
             None => return None
         };
-        let (_, data) = self.pkgs.pop(&pkg).unwrap();
-        self.pending.insert(pkg);
-        let fresh = if self.dirty.contains(&pkg) {Dirty} else {Fresh};
-        Some((pkg, fresh, data))
+        let (_, data) = self.dep_map.pop(&key).unwrap();
+        let fresh = if self.dirty.contains(&key) {Dirty} else {Fresh};
+        self.pending.insert(key.clone(), fresh);
+        Some((fresh, key, data))
     }
 
     /// Returns the number of remaining packages to be built.
     pub fn len(&self) -> uint {
-        self.pkgs.len() + self.pending.len()
+        self.dep_map.len() + self.pending.len()
     }
 
     /// Indicate that a package has been built.
     ///
     /// This function will update the dependency queue with this information,
     /// possibly allowing the next invocation of `dequeue` to return a package.
-    ///
-    /// The `fresh` parameter is used to indicate whether the package was
-    /// actually rebuilt or not. If no action was taken, then the parameter
-    /// should be `Fresh`. If a package was rebuilt, `Dirty` should be
-    /// specified, and the dirtiness will be propagated properly to all
-    /// dependencies.
-    pub fn finish(&mut self, pkg: &'a PackageId, fresh: Freshness) {
-        assert!(self.pending.remove(&pkg));
-        let reverse_deps = match self.reverse_dep_map.find(&pkg) {
+    pub fn finish(&mut self, key: &K) {
+        let fresh = self.pending.pop(key).unwrap();
+        let reverse_deps = match self.reverse_dep_map.find(key) {
             Some(deps) => deps,
             None => return,
         };
@@ -132,7 +128,7 @@ impl<'a, T> DependencyQueue<'a, T> {
             if fresh == Dirty {
                 self.dirty.insert(dep.clone());
             }
-            assert!(self.pkgs.get_mut(dep).mut0().remove(&pkg));
+            assert!(self.dep_map.get_mut(dep).mut0().remove(key));
         }
     }
 }
index f1da499f42fb4f377c194fc2bbbd3bd130d1fad9..6b1aa72b20078a34bf0630459eddeee6aebd13ee 100644 (file)
@@ -8,6 +8,7 @@ pub use self::paths::realpath;
 pub use self::hex::{to_hex, short_hash};
 pub use self::pool::TaskPool;
 pub use self::dependency_queue::{DependencyQueue, Fresh, Dirty, Freshness};
+pub use self::dependency_queue::Dependency;
 pub use self::graph::Graph;
 pub use self::to_url::ToUrl;